儘管大家講coroutine都會提到channel和flow,然後開始比較,但我個人覺得channel和flow兩個你都沒有概念的時候,聽我比較很難知道我在說什麼啦!!
這邊會分段介紹channel的特性,首先,channel就像是一個queue,可以把它想像成羽毛球桶,先進先出(fifo),應該很好懂吧
所以打算先介紹兩者,而後再來比較,首先我們先用一下channel吧
val firstChannel = Channel<Int> { }
val secondChannel = Channel<Int> { }
lifecycleScope.launch {
Timber.d("start")
val waiting = firstChannel.receive()
Timber.d("check waiting, $waiting")
val waiting2 = secondChannel.receive()
Timber.d("check waiting2, $waiting2")
Timber.d("after clair receive")
val result = async {
Timber.d("in result")
sampleDemoSuspendFunction("$waiting result")
}
val result2 = async {
Timber.d("in result 2 ")
sampleDemoSuspendFunction("$waiting2 result2")
}
Timber.d("${result.await()} , ${result2.await()}")
}
lifecycleScope.launch(Dispatchers.IO){
delay(1000)
firstChannel.send(1001)
Timber.d("sent first")
delay(1000)
secondChannel.send(1111 )
Timber.d("sent second")
}
suspend fun sampleDemoSuspendFunction(content: String): String {
delay(1000L)
return content
}
/**
*
hh:mm:ss
14:44:35.792 17905-17967/: sent first
14:44:35.792 17905-17905/: check waiting, 1001
14:44:36.793 17905-17967/: sent second
14:44:36.793 17905-17905/: check waiting, 1111
14:44:36.794 17905-17905/: after clair receive
14:44:36.794 17905-17905/$1$1$result: in result
14:44:36.795 17905-17905/d$1$1$result2: in result 2
14:44:37.798 17905-17905/: 1001 result , 1111 result2
* */
可以看到channel的關鍵字有send和receive兩個,在Timber裡面,我們要印出waiting和waiting2兩個變數,這就表示我們需要兩個變數都已經有值了,才去呼叫Timber
我已經把log都印出來了,從log可以看到先送出東西到channel,receive()才會被觸發,現在我們來講講send和receive的特性,receive很好懂,沒東西的時候就suspend起來,等到有東西為止,但send就不一樣了,send的時候如果沒有被receive,他也suspend在那邊等,等到有人收他東西,那如果沒人收呢,就沒有然後了!!
比如上面把兩個receive交換,secondChannel再等一個等不到的人,因為firstChannel沒有被接收,他suspend在那裏了
這個特性非常重要,如果把send和receive放在同一個thread,就會suspend起來,然後又沒有然後了QQ,用的位子百百種,使用時記得思考suspend的特性,send/receive有沒有對起來,規則告訴你了,剩下就讓各位發揮
lifecycleScope.launch {
launch {
for (x in 1..5) firstChannel.send(x)
}
repeat(5) { Timber.d(firstChannel.receive().toString()) }
Timber.d("Done!")
}
和corotuine的兩段式取消有點類似,當呼叫cancel()時,並不會立刻關閉,而是會等到先前發出的值都被接收後才會關閉,值得注意的一點,他有分send和receive,cancel()呼叫後不能send
channel的建構器可以用produce,而 for 在接收端可以用consumeEach有限制的替換,先看下面範例
lifecycleScope.launch {
val produceChannel = produce {
for (x in 1..5) send( x )
}
produceChannel.consumeEach { Timber.d(it.toString()) }
}
上面的例子應該表現出channel的基本特性了,但這並沒有解決coroutine之間溝通的問題,channel的特性在於可以再多個coroutine之間 send和receive
這裡再借文檔例子
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
/**
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
* */
給看不懂的人在解釋一下,多核心的時候,可以同時進行多個工作,用channel可以保證,變數的順序和更新
那剛剛說的consumeEach的限制,就是for在其中一個coroutine失敗時,可以安全的持續接收值直到結束,但comsumeEach在遇到,會無法接收到完整的資料
val firstChannel = Channel<Int>(10)//buffer 有10個
lifecycleScope.launch {
(1..10).forEach { firstChannel.send(it) }
val one = async { launchProcessor(1,firstChannel) }
val two = async { launchProcessor(2,firstChannel) }
val three = async { launchProcessor(3,firstChannel) }
one.await()
two.await()
three.await()
firstChannel.cancel() // cancel producer coroutine and thus kill them all
}
with for loop,這裡會少3是因為在for loop裡面,已經把3拿出來了,才 throw Exception,所以在channel裡面才會沒有3
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for ( msg in channel) {
if (msg == 3) throw CancellationException()
Timber.d("Processor #$id received $msg")
delay(500)
}
}
/**
*
Processor #1 received 1
Processor #2 received 2
Processor #1 received 4
Processor #2 received 5
Processor #1 received 6
Processor #2 received 7
Processor #1 received 8
Processor #2 received 9
Processor #1 received 10
* */
with consume
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach { msg ->
if (msg == 3) throw CancellationException()
Timber.d("Processor #$id received $msg")
delay(500)
}
}
/**
* Processor #1 received 1
* Processor #2 received 2
* */
最後,channel會盡可能的公平操作,ex.有兩個先receive之後再send的coroutine,他們會交替的被呼叫,而不會在一個裡面重複,文檔有範例,這邊有例外討論
在看官方影片的時候,剛好看到他們有做比較,真是太貼心了
只用list坐回傳的話順序是
用channel回傳的順序是交替的,你會依順序一個一個收到,而不是一次收到一串
如果沒被receive,在scope終止前,會一直等下去,不會有"30歲還單身我們結婚吧?" 這種事情
KotlinConf 2019 前面有講到channel
englidh doc
chinese doc